{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example on using the `imnet` CDR3 network analysis library"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import os\n",
    "os.environ['SPARK_HOME'] = os.path.join(os.path.expanduser('~'),'spark')\n",
    "\n",
    "import subprocess, re\n",
    "from IPython.display import HTML\n",
    "\n",
    "import sparkhpc\n",
    "from sparkhpc.sparkjob import LSFSparkJob\n",
    "\n",
    "import findspark\n",
    "findspark.init()\n",
    "import pyspark\n",
    "\n",
    "import numpy as np\n",
    "from scipy.sparse import csr_matrix \n",
    "\n",
    "import imnet"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Initialize a Spark cluster\n",
    "\n",
    "Depending on your system setup, this will vary. If you are running on an HPC cluster, we recommend using [`sparkhpc`](https://github.com/rokroskar/sparkhpc) to manage your spark clusters in the batch queue. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "sj = LSFSparkJob(ncores=10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sj.wait_to_start()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "sc = sparkhpc.start_spark(master=sj.master_url())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Check that Spark is working:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "sc.parallelize(range(sc.defaultParallelism)).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generating a random string sample"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "strings = imnet.random_strings.generate_random_sequences(5000)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['TSGLKQ',\n",
       " 'CATKSAKRFWWAAMWI',\n",
       " 'KAFKLFF',\n",
       " 'LGATNPE',\n",
       " 'VDHRRWKTQHNVTMNAFSK',\n",
       " 'MRQIKM',\n",
       " 'MILAARYNIEMYSMVF',\n",
       " 'YNLGMYASTVRT',\n",
       " 'NKCVNEMLIHMP',\n",
       " 'LAFENVSSTGTQC']"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "strings[:10]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating the distance matrix \n",
    "Now we create the weighted adjacency matrix, i.e. the distance matrix. If $S_i$ is the ith string, then the $i,j$ component of the matrix is the Levenshtein distance between strings $S_i$ and $S_j$, i.e. $D_{i,j}=L_d(S_i,S_j)$. We only keep the weights with $w_i = L_d(S_i) \\le L_{d,max}$."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:imnet.process_strings:number of strings 5000\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 4.93 s, sys: 0 ns, total: 4.93 s\n",
      "Wall time: 4.93 s\n"
     ]
    }
   ],
   "source": [
    "%%time \n",
    "mat_arr = np.array(list(imnet.process_strings.distance_matrix(strings, min_ld=1, max_ld=2)))\n",
    "mat = csr_matrix((mat_arr[:,2], (mat_arr[:,0], mat_arr[:,1])), (len(strings), len(strings)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<5000x5000 sparse matrix of type '<type 'numpy.int64'>'\n",
       "\twith 988 stored elements in Compressed Sparse Row format>"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mat"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this case, the calculations were done only on the local machine. If we pass a `SparkContext` to the `sc` keyword, the work is distributed across the Spark cluster. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:imnet.process_strings:number of strings 5000\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 57 ms, sys: 7 ms, total: 64 ms\n",
      "Wall time: 1.48 s\n"
     ]
    }
   ],
   "source": [
    "%%time \n",
    "mat_spark_arr = np.array(imnet.process_strings.distance_matrix(strings, min_ld=1, max_ld=2, sc=sc).collect())\n",
    "mat_spark = csr_matrix((mat_spark_arr[:,2], (mat_spark_arr[:,0], mat_spark_arr[:,1])), (len(strings), len(strings)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "<5000x5000 sparse matrix of type '<type 'numpy.int64'>'\n",
       "\twith 988 stored elements in Compressed Sparse Row format>"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "mat_spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The two matrices are identical: True\n"
     ]
    }
   ],
   "source": [
    "print 'The two matrices are identical: %s'%(not np.any((mat!=mat_spark).todense()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, the distance matrix can be turned into a weighted graph. The code below constructs the sparse distance matrix like we have done above and uses it to make a graph. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate a graph from the distance matrix"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:imnet.process_strings:number of strings 5000\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 65 ms, sys: 4 ms, total: 69 ms\n",
      "Wall time: 1.82 s\n"
     ]
    }
   ],
   "source": [
    "%time g = imnet.process_strings.generate_graph(strings, sc=sc, min_ld=1, max_ld=2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of edges: 988\n",
      "Number of nodes: 5000\n"
     ]
    }
   ],
   "source": [
    "print 'Number of edges: %d\\nNumber of nodes: %d'%(g.number_of_edges(), g.number_of_nodes())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['SYQLSDL',\n",
       " 'CTYELIDRDMVIPTVKW',\n",
       " 'YQKG',\n",
       " 'QPLELNVVPDS',\n",
       " 'NGEGGKSEV',\n",
       " 'VKGSYIFDCV',\n",
       " 'AQEDKVGCTQQCSFD',\n",
       " 'LDYTH',\n",
       " 'GFQDACRTFDP',\n",
       " 'YASFR']"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "g.nodes()[:10]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate degrees\n",
    "\n",
    "One of the key things we want to know is the degree distribution of the sequences. The usual graph methods only give us the overall degrees independent of edge weight, or in our case, edit distance. We have implemented a function `produce_degrees` that counts the degree for each node per $L_d$:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:imnet.process_strings:number of strings 5000\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 24 ms, sys: 13 ms, total: 37 ms\n",
      "Wall time: 4.05 s\n"
     ]
    }
   ],
   "source": [
    "%time degrees = imnet.process_strings.generate_degrees(strings, sc=sc, min_ld=1, max_ld=2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This is a 2D $(N \\times M)$ array, where $N$ is the number of strings and $M$ is the number of levels, i.e. if our maximum Levenshtein distance is 2, we have two levels. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([[0, 0, 1],\n",
       "       [0, 0, 0],\n",
       "       [0, 0, 0],\n",
       "       ..., \n",
       "       [0, 0, 0],\n",
       "       [0, 0, 0],\n",
       "       [0, 0, 0]], dtype=int32)"
      ]
     },
     "execution_count": 23,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "degrees"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The layout is such that the rows of the array correspond to the strings in our string map, e.g. lets say we want to know the degrees for string `YQKG`, we can do"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "string_map = {s:i for i,s in enumerate(strings)}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "array([0, 0, 4], dtype=int32)"
      ]
     },
     "execution_count": 28,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "string_id = string_map['YQKG']\n",
    "degrees[string_id]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This means that this node has four $L_d=2$ connections and zero $L_d=1$ connections"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Degrees from networkx match our calculation: True\n"
     ]
    }
   ],
   "source": [
    "g_degrees = g.degree()\n",
    "degree_match = np.all([g_degrees[s]==d.sum() for s,d in zip(strings,degrees)])\n",
    "print 'Degrees from networkx match our calculation: %s'%degree_match"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generate a distributed graph\n",
    "\n",
    "For very large samples or very deep graphs (high $L_d$) it might be prohibitive to generate graphs locally -- instead we could use the distributed graph library [`GraphFrames`](http://graphframes.github.io/index.html) that can be used with `Spark`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "INFO:imnet.process_strings:number of strings 5000\n"
     ]
    }
   ],
   "source": [
    "g_rdd = imnet.process_strings.generate_spark_graph(strings, sc, max_ld=2).cache()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The IDs of the vertices correspond to our `string_map` constructed above:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-------------------+\n",
      "| id|             string|\n",
      "+---+-------------------+\n",
      "|  0|             TSGLKQ|\n",
      "|  1|   CATKSAKRFWWAAMWI|\n",
      "|  2|            KAFKLFF|\n",
      "|  3|            LGATNPE|\n",
      "|  4|VDHRRWKTQHNVTMNAFSK|\n",
      "|  5|             MRQIKM|\n",
      "|  6|   MILAARYNIEMYSMVF|\n",
      "|  7|       YNLGMYASTVRT|\n",
      "|  8|       NKCVNEMLIHMP|\n",
      "|  9|      LAFENVSSTGTQC|\n",
      "| 10|               ASFD|\n",
      "| 11|      HCEMEGRLHLAIK|\n",
      "| 12|            DVMFPTA|\n",
      "| 13|               DRER|\n",
      "| 14|     MFYSMFPINHPCGY|\n",
      "| 15| HVHWRKGAVATYNEVYRM|\n",
      "| 16|         HFQKEDQGLH|\n",
      "| 17|  VAFHYSHFQTWHVQLYF|\n",
      "| 18|    CEWFSPTIHGVPYVD|\n",
      "| 19|       DACVSTHPSINF|\n",
      "+---+-------------------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "g_rdd.vertices.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+----+----+------+\n",
      "| src| dst|weight|\n",
      "+----+----+------+\n",
      "|1858|1030|     2|\n",
      "|4161| 447|     2|\n",
      "|4161|1442|     2|\n",
      "|4161|2841|     2|\n",
      "|2314|1875|     2|\n",
      "|2111|1839|     2|\n",
      "|4481| 705|     2|\n",
      "|4481|1118|     2|\n",
      "|4481|1413|     2|\n",
      "|4481|1899|     2|\n",
      "|4481|1936|     2|\n",
      "|4481|2884|     2|\n",
      "|4481|3062|     2|\n",
      "|4308|1476|     2|\n",
      "|4308|2555|     2|\n",
      "|4308|3595|     2|\n",
      "|4781| 190|     1|\n",
      "|4781|1678|     2|\n",
      "|4781|1946|     2|\n",
      "|4781|2928|     2|\n",
      "+----+----+------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "g_rdd.edges.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There are several graph algorithms we can try:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 69 ms, sys: 28 ms, total: 97 ms\n",
      "Wall time: 34.2 s\n"
     ]
    }
   ],
   "source": [
    "%time comp_rdd = g_rdd.connectedComponents()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-------------------+---------+\n",
      "| id|             string|component|\n",
      "+---+-------------------+---------+\n",
      "|  0|             TSGLKQ|        0|\n",
      "|  1|   CATKSAKRFWWAAMWI|        1|\n",
      "|  2|            KAFKLFF|        2|\n",
      "|  3|            LGATNPE|        3|\n",
      "|  4|VDHRRWKTQHNVTMNAFSK|        4|\n",
      "|  5|             MRQIKM|        5|\n",
      "|  6|   MILAARYNIEMYSMVF|        6|\n",
      "|  7|       YNLGMYASTVRT|        7|\n",
      "|  8|       NKCVNEMLIHMP|        8|\n",
      "|  9|      LAFENVSSTGTQC|        9|\n",
      "| 10|               ASFD|       10|\n",
      "| 11|      HCEMEGRLHLAIK|       11|\n",
      "| 12|            DVMFPTA|       12|\n",
      "| 13|               DRER|       10|\n",
      "| 14|     MFYSMFPINHPCGY|       14|\n",
      "| 15| HVHWRKGAVATYNEVYRM|       15|\n",
      "| 16|         HFQKEDQGLH|       16|\n",
      "| 17|  VAFHYSHFQTWHVQLYF|       17|\n",
      "| 18|    CEWFSPTIHGVPYVD|       18|\n",
      "| 19|       DACVSTHPSINF|       19|\n",
      "+---+-------------------+---------+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "comp_rdd.sort('id').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Top components:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+\n",
      "|component|count|\n",
      "+---------+-----+\n",
      "|       10|  487|\n",
      "|     1910|    3|\n",
      "|      641|    3|\n",
      "|      358|    2|\n",
      "|     2004|    2|\n",
      "|     2713|    2|\n",
      "|     2334|    2|\n",
      "|     2449|    2|\n",
      "|      269|    2|\n",
      "|     1848|    2|\n",
      "|        0|    2|\n",
      "|     1334|    2|\n",
      "|      871|    2|\n",
      "|      951|    2|\n",
      "|     2734|    2|\n",
      "|      536|    2|\n",
      "|     1646|    2|\n",
      "|      188|    2|\n",
      "|     1821|    2|\n",
      "|     1514|    2|\n",
      "+---------+-----+\n",
      "only showing top 20 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "gb = comp_rdd.groupBy('component')\n",
    "gb.count().sort('count', ascending=False).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "sc.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
